// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

import com.amazonaws.services.s3.model.ListObjectsRequest
import java.util.function.Supplier

suite("test_cold_data_compaction_fault_injection", "nonConcurrent") {
    GetDebugPoint().clearDebugPointsForAllBEs()
    def retryUntilTimeout = { int timeoutSecond, Supplier<Boolean> closure ->
        long start = System.currentTimeMillis()
        while (true) {
            if (closure.get()) {
                return
            } else {
                if (System.currentTimeMillis() - start > timeoutSecond * 1000) {
                    throw new RuntimeException("" +
                            "Operation timeout, maybe you need to check " +
                            "remove_unused_remote_files_interval_sec and " +
                            "cold_data_compaction_interval_sec in be.conf")
                } else {
                    sleep(10_000)
                }
            }
        }
    }
    def tabletName = "test_cold_data_compaction_fault_injection"

    String suffix = UUID.randomUUID().hashCode().abs().toString()
    String s3Prefix = "regression/cold_data_compaction/${suffix}"
    multi_sql """
            DROP TABLE IF EXISTS ${tabletName} force;
            DROP STORAGE POLICY IF EXISTS test_policy_${suffix};
            DROP RESOURCE IF EXISTS 'remote_s3_${suffix}';

            CREATE RESOURCE "remote_s3_${suffix}"
            PROPERTIES
            (
                "type" = "s3",
                "s3.endpoint" = "${getS3Endpoint()}",
                "s3.region" = "${getS3Region()}",
                "s3.bucket" = "${getS3BucketName()}",
                "s3.root.path" = "${s3Prefix}",
                "s3.access_key" = "${getS3AK()}",
                "s3.secret_key" = "${getS3SK()}",
                "s3.connection.maximum" = "50",
                "s3.connection.request.timeout" = "3000",
                "s3.connection.timeout" = "1000"
            );
            CREATE STORAGE POLICY test_policy_${suffix}
            PROPERTIES(
                "storage_resource" = "remote_s3_${suffix}",
                "cooldown_ttl" = "5"
            );
            CREATE TABLE IF NOT EXISTS ${tabletName}
            (
                k1 BIGINT,
                k2 LARGEINT,
                v1 VARCHAR(2048)
            )
            DISTRIBUTED BY HASH (k1) BUCKETS 1
            PROPERTIES(
                "storage_policy" = "test_policy_${suffix}",
                "disable_auto_compaction" = "true",
                "replication_num" = "1"
            );
        """

    // insert 5 RowSets
    multi_sql """
        insert into ${tabletName} values(1, 1, 'Tom');
        insert into ${tabletName} values(2, 2, 'Jelly');
        insert into ${tabletName} values(3, 3, 'Spike');
        insert into ${tabletName} values(4, 4, 'Tyke');
        insert into ${tabletName} values(5, 5, 'Tuffy');
    """

    // wait until files upload to S3
    retryUntilTimeout(900, {
        def res = sql_return_maparray "show data from ${tabletName}"
        String size = ""
        String remoteSize = ""
        for (final def line in res) {
            if (tabletName.equals(line.TableName)) {
                size = line.Size
                remoteSize = line.RemoteSize
                break
            }
        }
        logger.info("waiting for data to be uploaded to S3: ${tabletName}'s local data size: ${size}, remote data size: ${remoteSize}")
        return size.startsWith("0") && !remoteSize.startsWith("0")
    })

    String tabletId = sql_return_maparray("show tablets from ${tabletName}")[0].TabletId
    // check number of remote files
    def filesBeforeCompaction = getS3Client().listObjects(
            new ListObjectsRequest().withBucketName(getS3BucketName()).withPrefix(s3Prefix + "/data/${tabletId}")).getObjectSummaries()

    // 5 RowSets + 1 meta
    assertEquals(6, filesBeforeCompaction.size())

    try {
        GetDebugPoint().clearDebugPointsForAllBEs()
        GetDebugPoint().enableDebugPointForAllBEs("Tablet._calc_cumulative_compaction_score.return")
        // trigger cold data compaction
        sql """alter table ${tabletName} set ("disable_auto_compaction" = "false")"""

        // wait until compaction finish
        retryUntilTimeout(900, {
            def filesAfterCompaction = getS3Client().listObjects(
                    new ListObjectsRequest().withBucketName(getS3BucketName()).withPrefix(s3Prefix+ "/data/${tabletId}")).getObjectSummaries()
            logger.info("${tabletName}'s remote file number is ${filesAfterCompaction.size()}")
            // 1 RowSet + 1 meta
            return filesAfterCompaction.size() == 7
        })

        sql "drop table ${tabletName} force"
    } finally {
        GetDebugPoint().disableDebugPointForAllBEs("Tablet._calc_cumulative_compaction_score.return")
        GetDebugPoint().clearDebugPointsForAllBEs()
    }
}
